{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "import threading\n",
    "from kafka import KafkaProducer\n",
    "from kafka import KafkaConsumer\n",
    "import sys,time,json\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "consumer = KafkaConsumer(bootstrap_servers='192.168.1.105:9092')\n",
    "consumer = KafkaConsumer('BizPlatform_Filter_DataSync', group_id='my_favorite_group2')\n",
    "#BizPlatform_Filter_DataSync,BizPlatform_DataSync_UpChannel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "start\n"
     ]
    }
   ],
   "source": [
    "def kafkaConsumer():\n",
    "    print(\"start\")\n",
    "    for msg in consumer:\n",
    "        display=msg.value.decode()\n",
    "        res=json.loads(display)\n",
    "        #display[\"data\"][personList]\n",
    "        print(display)\n",
    "    print(\"stop\")\n",
    "threading.Thread(target=kafkaConsumer ).start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import base64\n",
    "import uuid,copy\n",
    "sys.path.append(\"..\")\n",
    "from sjlib import sjPath\n",
    "ROOT_PATH=\"Z:/share_path/\"\n",
    "def filterTopic_uplord(savePath=\"/\"):\n",
    "    for msg in consumer:\n",
    "        display=msg.value.decode()\n",
    "        res=json.loads(display)\n",
    "        faces=copy.deepcopy(res)\n",
    "        groupId=str(uuid.uuid4())\n",
    "        def saveImg(faces):\n",
    "            \n",
    "            for (pid,face) in faces.items():\n",
    "                avatar = face;img = base64.b64decode(avatar)\n",
    "\n",
    "                file_name=sjPath.departWrite(ROOT_PATH,pid+groupId+\".jpg\")\n",
    "                with  open(file_name,'wb' ) as f:\n",
    "                    f.write(img)\n",
    "                \n",
    "                face=file_name\n",
    "                \n",
    "        faces=faces[\"data\"][\"personImages\"]\n",
    "        saveImg(faces)\n",
    "        faces=faces[\"data\"][\"vehicleImages\"]\n",
    "        saveImg(faces)\n",
    "        res[\"groupId\"]=groupId \n",
    "        print(res)\n",
    "    return\n",
    "filterTopic_uplord(ROOT_PATH)\n",
    "#     faces=faces[\"data\"][\"personImages\"]\n",
    "#     del faces[\"background_image\"]\n",
    "#     for (pid,face) in faces.items():\n",
    "#         avatar = face;img = base64.b64decode(avatar)\n",
    "\n",
    "#         file_name=sjPath.departWrite(ROOT_PATH,str(uuid.uuid4())+\".jpg\")\n",
    "\n",
    "#         id_info[\"image_path\"]=file_name#+图片路径\n",
    "#         with  open(file_name,'wb' ) as f:\n",
    "#             f.write(img)\n",
    "#         print(face)\n",
    "        \n",
    "# out "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "\n",
    "t={\"c\":124,\"sc\":3,\"d\":{\"ss\":35},\"d1\":[123,2423,23] }\n",
    "tt=copy.deepcopy(t)\n",
    "\n",
    "tt[\"c\"]=245;tt[\"d\"][\"ss\"]=23;tt[\"d1\"]=[24] ;tt[\"d\"]={\"2\":234}\n",
    "print(t)\n",
    "print(tt)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "ename": "KeyboardInterrupt",
     "evalue": "",
     "output_type": "error",
     "traceback": [
      "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[1;31mKeyboardInterrupt\u001b[0m                         Traceback (most recent call last)",
      "\u001b[1;32m<ipython-input-23-0bcebd267da1>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m      2\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m      3\u001b[0m \u001b[0mcnt\u001b[0m\u001b[1;33m=\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 4\u001b[1;33m \u001b[1;32mfor\u001b[0m \u001b[0mmsg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mconsumer\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m      5\u001b[0m     \u001b[0mdisplay\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mmsg\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mvalue\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdecode\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m      6\u001b[0m     \u001b[0mdisplay\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mjson\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mloads\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mdisplay\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32mc:\\program files\\python36\\lib\\site-packages\\kafka\\consumer\\group.py\u001b[0m in \u001b[0;36m__next__\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m   1116\u001b[0m         \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_set_consumer_timeout\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m   1117\u001b[0m         \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 1118\u001b[1;33m             \u001b[1;32mreturn\u001b[0m \u001b[0mnext\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_iterator\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m   1119\u001b[0m         \u001b[1;32mexcept\u001b[0m \u001b[0mStopIteration\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m   1120\u001b[0m             \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_iterator\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;32mNone\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32mc:\\program files\\python36\\lib\\site-packages\\kafka\\consumer\\group.py\u001b[0m in \u001b[0;36m_message_generator\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m   1060\u001b[0m             \u001b[1;32mif\u001b[0m \u001b[1;32mnot\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_fetcher\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0min_flight_fetches\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m   1061\u001b[0m                 \u001b[0mpoll_ms\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mmin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mpoll_ms\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mconfig\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;34m'reconnect_backoff_ms'\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 1062\u001b[1;33m             \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mpoll\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtimeout_ms\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mpoll_ms\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m   1063\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m   1064\u001b[0m             \u001b[1;31m# after the long poll, we should check whether the group needs to rebalance\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32mc:\\program files\\python36\\lib\\site-packages\\kafka\\client_async.py\u001b[0m in \u001b[0;36mpoll\u001b[1;34m(self, timeout_ms, future)\u001b[0m\n\u001b[0;32m    554\u001b[0m                     \u001b[0mtimeout\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mmax\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtimeout\u001b[0m \u001b[1;33m/\u001b[0m \u001b[1;36m1000\u001b[0m\u001b[1;33m)\u001b[0m  \u001b[1;31m# avoid negative timeouts\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    555\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 556\u001b[1;33m                 \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_poll\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    557\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    558\u001b[0m             \u001b[1;31m# called without the lock to avoid deadlock potential\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32mc:\\program files\\python36\\lib\\site-packages\\kafka\\client_async.py\u001b[0m in \u001b[0;36m_poll\u001b[1;34m(self, timeout)\u001b[0m\n\u001b[0;32m    572\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    573\u001b[0m         \u001b[0mstart_select\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mtime\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mtime\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 574\u001b[1;33m         \u001b[0mready\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_selector\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mselect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    575\u001b[0m         \u001b[0mend_select\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mtime\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mtime\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    576\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_sensors\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32mc:\\program files\\python36\\lib\\selectors.py\u001b[0m in \u001b[0;36mselect\u001b[1;34m(self, timeout)\u001b[0m\n\u001b[0;32m    321\u001b[0m         \u001b[0mready\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;33m[\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    322\u001b[0m         \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 323\u001b[1;33m             \u001b[0mr\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mw\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0m_\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_select\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_readers\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_writers\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m[\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtimeout\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    324\u001b[0m         \u001b[1;32mexcept\u001b[0m \u001b[0mInterruptedError\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    325\u001b[0m             \u001b[1;32mreturn\u001b[0m \u001b[0mready\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;32mc:\\program files\\python36\\lib\\selectors.py\u001b[0m in \u001b[0;36m_select\u001b[1;34m(self, r, w, _, timeout)\u001b[0m\n\u001b[0;32m    312\u001b[0m     \u001b[1;32mif\u001b[0m \u001b[0msys\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mplatform\u001b[0m \u001b[1;33m==\u001b[0m \u001b[1;34m'win32'\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    313\u001b[0m         \u001b[1;32mdef\u001b[0m \u001b[0m_select\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mr\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mw\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0m_\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtimeout\u001b[0m\u001b[1;33m=\u001b[0m\u001b[1;32mNone\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 314\u001b[1;33m             \u001b[0mr\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mw\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mx\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mselect\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mselect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mr\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mw\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mw\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtimeout\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    315\u001b[0m             \u001b[1;32mreturn\u001b[0m \u001b[0mr\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mw\u001b[0m \u001b[1;33m+\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m[\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    316\u001b[0m     \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
      "\u001b[1;31mKeyboardInterrupt\u001b[0m: "
     ]
    }
   ],
   "source": [
    "# 显示 =======\n",
    "\n",
    "cnt=0\n",
    "for msg in consumer:\n",
    "    display=msg.value.decode()\n",
    "    display=json.loads(display)\n",
    "#     del display[\"data\"][\"personImages\"]\n",
    "#     del display[\"data\"][\"vehicleImages\"]\n",
    "    display=json.dumps(display,sort_keys=True, indent=4, separators=(',', ':'),ensure_ascii=False)\n",
    "    print(display)    \n",
    "    cnt+=1\n",
    "    print(cnt)\n",
    "    break"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "start\n"
     ]
    }
   ],
   "source": [
    "from kafka import KafkaProducer\n",
    "from kafka import KafkaConsumer\n",
    "import sys,time,threading\n",
    "\n",
    "#consumer = KafkaConsumer()\n",
    "\n",
    "def kConsumer():\n",
    "    consumer = KafkaConsumer('__consumer_offsets', group_id='group__consumer_offsets',bootstrap_servers='192.168.1.201:9092')\n",
    "    print(\"start\")\n",
    "    for msg in consumer:\n",
    "        display=msg.values.decode()\n",
    "        #del display[\"feature\"]\n",
    "        print (display[:100],time.time())\n",
    "    print(\"stop\")\n",
    "#BizPlatform_Filter_DataSync\n",
    "kConsumer()\n",
    "#threading.Thread(target=kConsumer ).start()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
